#!/usr/bin/env python3
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
This script is intended to cover end to end testing for the standalone sender
and receiver executables in cast. This ensures that the basic functionality of
these executables is not impaired, such as the TLS/UDP connections and encoding
and decoding video.
"""

import argparse
import os
import pathlib
import logging
import subprocess
import sys
import time
import unittest
import ssl
from collections import namedtuple

from enum import IntEnum, IntFlag
from urllib import request

# Environment variables that can be overridden to set test properties.
ROOT_ENVVAR = 'OPENSCREEN_ROOT_DIR'
BUILD_ENVVAR = 'OPENSCREEN_BUILD_DIR'
LIBAOM_ENVVAR = 'OPENSCREEN_HAVE_LIBAOM'

TEST_VIDEO_NAME = 'Contador_Glam.mp4'
# NOTE: we use the HTTP protocol instead of HTTPS due to certificate issues
# in the legacy urllib.request API.
TEST_VIDEO_URL = ('https://storage.googleapis.com/openscreen_standalone/' +
                  TEST_VIDEO_NAME)

PROCESS_TIMEOUT = 15  # seconds

# Open Screen test certificates expire after 3 days. We crop this slightly (by
# 8 hours) to account for potential errors in time calculations.
CERT_EXPIRY_AGE = (3 * 24 - 8) * 60 * 60

# These properties are based on compiled settings in Open Screen, and should
# not change without updating this file.
TEST_CERT_NAME = 'generated_root_cast_receiver.crt'
TEST_KEY_NAME = 'generated_root_cast_receiver.key'
SENDER_BINARY_NAME = 'cast_sender'
RECEIVER_BINARY_NAME = 'cast_receiver'

EXPECTED_RECEIVER_MESSAGES = [
    "CastService is running.", "Found codec: opus (known to FFMPEG as opus)",
    "Successfully negotiated a session, creating SDL players.",
    "Receivers are currently destroying, resetting SDL players."
]

class VideoCodec(IntEnum):
  """There are different messages printed by the receiver depending on the codec
  chosen. """
  Vp8 = 0
  Vp9 = 1
  Av1 = 2

VIDEO_CODEC_SPECIFIC_RECEIVER_MESSAGES = [
  "Found codec: vp8 (known to FFMPEG as vp8)",
  "Found codec: vp9 (known to FFMPEG as vp9)",
  "Found codec: libaom-av1 (known to FFMPEG as av1)"
]

EXPECTED_SENDER_MESSAGES = [
    "Launching Mirroring App on the Cast Receiver",
    "Max allowed media bitrate (audio + video) will be",
    "Contador_Glam.mp4 (starts in one second)...",
    "The video capturer has reached the end of the media stream.",
    "The audio capturer has reached the end of the media stream.",
    "Video complete. Exiting...", "Shutting down..."
]

MISSING_LOG_MESSAGE = """Missing an expected message from either the sender
or receiver. This either means that one of the binaries misbehaved, or you
changed or deleted one of the log messages used for validation. Please ensure
that the necessary log messages are left unchanged, or update this
test suite's expectations."""

DESCRIPTION = """Runs end to end tests for the standalone Cast Streaming sender
and receiver. By default, this script assumes it is being ran from a current
working directory inside Open Screen's source directory, and uses
<root_dir>/out/Default as the build directory. To override these, set the
OPENSCREEN_ROOT_DIR and OPENSCREEN_BUILD_DIR environment variables. If the root
directory is set and the build directory is not,
<OPENSCREEN_ROOT_DIR>/out/Default will be used. In addition, if LibAOM is
installed, one can choose to run AV1 tests by defining the
OPENSCREEN_HAVE_LIBAOM environment variable.

See below for the the help output generated by the `unittest` package."""


def _set_log_level(is_verbose):
    """Sets the logging level, either DEBUG or ERROR as appropriate."""
    level = logging.DEBUG if is_verbose else logging.INFO
    logging.basicConfig(stream=sys.stdout, level=level)


def _get_loopback_adapter_name():
    """Retrieves the name of the loopback adapter (lo on Linux/lo0 on Mac)."""
    if sys.platform == 'linux' or sys.platform == 'linux2':
        return 'lo'
    if sys.platform == 'darwin':
        return 'lo0'
    return None


def _get_file_age_in_seconds(path):
    """Get the age of a given file in seconds"""
    # Time is stored in seconds since epoch
    file_last_modified = 0
    if path.exists():
        file_last_modified = path.stat().st_mtime
    return time.time() - file_last_modified


def _get_build_paths():
    """Gets the root and build paths (either default or from the environment
    variables), and sets related paths to binaries and files."""
    root_path = pathlib.Path(
    os.environ[ROOT_ENVVAR] if os.getenv(ROOT_ENVVAR) else subprocess.
    getoutput('git rev-parse --show-toplevel'))
    assert root_path.exists(), 'Could not find openscreen root!'

    build_path = pathlib.Path(os.environ[BUILD_ENVVAR]) if os.getenv(
        BUILD_ENVVAR) else root_path.joinpath('out',
                                                    'Default').resolve()
    assert build_path.exists(), 'Could not find openscreen build!'

    BuildPaths = namedtuple("BuildPaths",
                            "root build test_video cast_receiver cast_sender")
    return BuildPaths(root = root_path,
        build = build_path,
        test_video = build_path.joinpath(TEST_VIDEO_NAME).resolve(),
        cast_receiver = build_path.joinpath(RECEIVER_BINARY_NAME).resolve(),
        cast_sender = build_path.joinpath(SENDER_BINARY_NAME).resolve()
        )


class TestFlags(IntFlag):
    """
    Test flags, primarily used to control sender and receiver configuration
    to test different features of the standalone libraries.
    """
    UseRemoting = 1
    UseAndroidHack = 2


class StandaloneCastTest(unittest.TestCase):
    """
    Test class for setting up and running end to end tests on the
    standalone sender and receiver binaries. This class uses the unittest
    package, so methods that are executed as tests all have named prefixed
    with "test_".

    This suite sets the current working directory to the root of the Open
    Screen repository, and references all files from the root directory.
    Generated certificates should always be in |cls.build_paths.root|.
    """

    @classmethod
    def setUpClass(cls):
        """Shared setup method for all tests, handles one-time updates."""
        cls.build_paths = _get_build_paths()
        os.chdir(cls.build_paths.root)
        cls.download_video()
        cls.generate_certificates()

    @classmethod
    def download_video(cls):
        """Downloads the test video from Google storage."""
        if os.path.exists(cls.build_paths.test_video):
            logging.debug('Video already exists, skipping download...')
            return

        logging.debug('Downloading video from %s', TEST_VIDEO_URL)
        with request.urlopen(TEST_VIDEO_URL, context=ssl.SSLContext()) as url:
            with open(cls.build_paths.test_video, 'wb') as file:
                file.write(url.read())

    @classmethod
    def generate_certificates(cls):
        """Generates test certificates using the cast receiver."""
        cert_age = _get_file_age_in_seconds(pathlib.Path(TEST_CERT_NAME))
        key_age = _get_file_age_in_seconds(pathlib.Path(TEST_KEY_NAME))
        if cert_age < CERT_EXPIRY_AGE and key_age < CERT_EXPIRY_AGE:
            logging.debug('Credentials are up to date...')
            return

        logging.debug('Credentials out of date, generating new ones...')
        try:
            subprocess.check_output(
                [
                    cls.build_paths.cast_receiver,
                    '-g',  # Generate certificate and private key.
                    '-v'  # Enable verbose logging.
                ],
                stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError as e:
            print('Generation failed with output: ', e.output.decode())
            raise

    def launch_receiver(self):
        """Launches the receiver process with discovery disabled."""
        logging.debug('Launching the receiver application...')
        loopback = _get_loopback_adapter_name()
        self.assertTrue(loopback)

        #pylint: disable = consider-using-with
        return subprocess.Popen(
            [
                self.build_paths.cast_receiver,
                '-d',
                TEST_CERT_NAME,
                '-p',
                TEST_KEY_NAME,
                '-x',  # Skip discovery, only necessary on Mac OS X.
                '-v',  # Enable verbose logging.
                loopback
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)

    def launch_sender(self, flags, codec=None):
        """Launches the sender process, running the test video file once."""
        logging.debug('Launching the sender application...')
        command = [
            self.build_paths.cast_sender,
            '127.0.0.1:8010',
            self.build_paths.test_video,
            '-d',
            TEST_CERT_NAME,
            '-n'  # Only play the video once, and then exit.
        ]
        if TestFlags.UseAndroidHack in flags:
            command.append('-a')
        if TestFlags.UseRemoting in flags:
            command.append('-r')

        # The standalone sender sends VP8 if no codec command line argument is
        # passed.
        if codec:
          command.append('-c')
          if codec == VideoCodec.Vp8:
            command.append('vp8')
          elif codec == VideoCodec.Vp9:
              command.append('vp9')
          else:
              self.assertTrue(codec == VideoCodec.Av1)
              command.append('av1')

        #pylint: disable = consider-using-with
        return subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)

    def check_logs(self, logs, codec=None):
        """Checks that the outputted logs contain expected behavior."""

        # If a codec was not provided, we should make sure that the standalone
        # sender sent VP8.
        if codec == None:
          codec = VideoCodec.Vp8

        for message in (EXPECTED_RECEIVER_MESSAGES +
                        [VIDEO_CODEC_SPECIFIC_RECEIVER_MESSAGES[codec]]):
            self.assertTrue(
                message in logs[0],
                'Missing log message: {}.\n{}'.format(message,
                                                      MISSING_LOG_MESSAGE))
        for message in EXPECTED_SENDER_MESSAGES:
            self.assertTrue(
                message in logs[1],
                'Missing log message: {}.\n{}'.format(message,
                                                      MISSING_LOG_MESSAGE))
        for log, prefix in logs, ["[ERROR:", "[FATAL:"]:
            self.assertTrue(prefix not in log, "Logs contained an error")
        logging.debug('Finished validating log output')

    def get_output(self, flags, codec=None):
        """Launches the sender and receiver, and handles exit output."""
        receiver_process = self.launch_receiver()
        logging.debug('Letting the receiver start up...')
        time.sleep(3)
        sender_process = self.launch_sender(flags, codec)

        logging.debug('Launched sender PID %i and receiver PID %i...',
            sender_process.pid, receiver_process.pid)
        logging.debug('collating output...')
        output = (receiver_process.communicate(
            timeout=PROCESS_TIMEOUT)[1].decode('utf-8'),
                  sender_process.communicate(
                      timeout=PROCESS_TIMEOUT)[1].decode('utf-8'))

        # TODO(issuetracker.google.com/194292855): standalones should exit zero.
        # Remoting causes the sender to exit with code -4.
        if not TestFlags.UseRemoting in flags:
            self.assertEqual(sender_process.returncode, 0,
                             'sender had non-zero exit code')
        return output

    def test_golden_case(self):
        """Tests that when settings are normal, things work end to end."""
        output = self.get_output([])
        self.check_logs(output)

    def test_remoting(self):
        """Tests that basic remoting works."""
        output = self.get_output(TestFlags.UseRemoting)
        self.check_logs(output)

    def test_with_android_hack(self):
        """Tests that things work when the Android RTP hack is enabled."""
        output = self.get_output(TestFlags.UseAndroidHack)
        self.check_logs(output)

    def test_vp8_flag(self):
      """Tests that the VP8 flag works with standard settings."""
      output = self.get_output([], VideoCodec.Vp8)
      self.check_logs(output, VideoCodec.Vp8)

    def test_vp9_flag(self):
      """Tests that the VP9 flag works with standard settings."""
      output = self.get_output([], VideoCodec.Vp9)
      self.check_logs(output, VideoCodec.Vp9)

    @unittest.skipUnless(os.getenv(LIBAOM_ENVVAR),
                        'Skipping AV1 test since LibAOM not installed.')
    def test_av1_flag(self):
      """Tests that the AV1 flag works with standard settings."""
      output = self.get_output([], VideoCodec.Av1)
      self.check_logs(output, VideoCodec.Av1)


def parse_args():
    """Parses the command line arguments and sets up the logging module."""
    # NOTE for future developers: the `unittest` module will complain if it is
    # passed any args that it doesn't understand. If any Open Screen-specific
    # command line arguments are added in the future, they should be cropped
    # from sys.argv before |unittest.main()| is called.
    parser = argparse.ArgumentParser(description=DESCRIPTION)
    parser.add_argument('-v',
                        '--verbose',
                        help='enable debug logging',
                        action='store_true')

    parsed_args = parser.parse_args(sys.argv[1:])
    _set_log_level(parsed_args.verbose)


if __name__ == '__main__':
    parse_args()
    unittest.main()
